{
 "cells": [
  {
   "cell_type": "raw",
   "metadata": {
    "pycharm": {
     "name": "#%% raw\n"
    },
    "raw_mimetype": "text/restructuredtext"
   },
   "source": [
    ".. _nb_parallelization:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "# Parallelization"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "In practice, parallelization is essential and can significantly speed up optimization. \n",
    "For population-based algorithms, the evaluation of a set of solutions can be parallelized easily \n",
    "by parallelizing the evaluation itself."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Vectorized Matrix Operations\n",
    "\n",
    "One way is using the `NumPy` matrix operations, which has been used for almost all test problems implemented in *pymoo*.\n",
    "By default, `elementwise_evaluation` is set to `False`, which implies the `_evaluate` retrieves a set of solutions.\n",
    "Thus, `x` is a matrix where each row is an individual, and each column a variable."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "from pymoo.core.problem import Problem\n",
    "\n",
    "class MyProblem(Problem):\n",
    "\n",
    "    def __init__(self, **kwargs):\n",
    "        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5, **kwargs)\n",
    "\n",
    "    def _evaluate(self, x, out, *args, **kwargs):\n",
    "         out[\"F\"] = np.sum(x ** 2, axis=1)\n",
    "\n",
    "problem = MyProblem()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "The `axis=1` operation parallelizes the sum of the matrix directly using an efficient NumPy operation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "from pymoo.algorithms.soo.nonconvex.ga import GA\n",
    "from pymoo.optimize import minimize\n",
    "\n",
    "res = minimize(problem, GA(), termination=(\"n_gen\", 200), seed=1)\n",
    "print('Threads:', res.exec_time)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Starmap Interface\n",
    "\n",
    "In general, **pymoo** allows passing a `starmap` object to be used for parallelization. \n",
    "The `starmap` interface is defined in the Python standard library `multiprocessing.Pool.starmap` [function](https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#multiprocessing.pool.Pool.starmap).\n",
    "This allows excellent and flexible parallelization opportunities. \n",
    "\n",
    "**IMPORTANT:** Please note that the problem needs to have set `elementwise_evaluation=True`, which implicates one call of `_evaluate` only takes care of a single solution.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "from pymoo.core.problem import ElementwiseProblem\n",
    "\n",
    "class MyProblem(ElementwiseProblem):\n",
    "\n",
    "    def __init__(self, **kwargs):\n",
    "        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5, **kwargs)\n",
    "\n",
    "    def _evaluate(self, x, out, *args, **kwargs):\n",
    "         out[\"F\"] = (x ** 2).sum()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "Then, we can pass a `starmap` object to be used for parallelization."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "### Threads"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false
    },
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "from multiprocessing.pool import ThreadPool\n",
    "from pymoo.core.problem import StarmapParallelization\n",
    "from pymoo.algorithms.soo.nonconvex.ga import GA\n",
    "from pymoo.optimize import minimize\n",
    "\n",
    "\n",
    "# initialize the thread pool and create the runner\n",
    "n_threads = 4\n",
    "pool = ThreadPool(n_threads)\n",
    "runner = StarmapParallelization(pool.starmap)\n",
    "\n",
    "# define the problem by passing the starmap interface of the thread pool\n",
    "problem = MyProblem(elementwise_runner=runner)\n",
    "\n",
    "res = minimize(problem, GA(), termination=(\"n_gen\", 200), seed=1)\n",
    "print('Threads:', res.exec_time)\n",
    "\n",
    "pool.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "### Processes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "outputs_hidden": false
    },
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "import multiprocessing\n",
    "from pymoo.algorithms.soo.nonconvex.ga import GA\n",
    "from pymoo.optimize import minimize\n",
    "\n",
    "\n",
    "# initialize the thread pool and create the runner\n",
    "n_proccess = 8\n",
    "pool = multiprocessing.Pool(n_proccess)\n",
    "runner = StarmapParallelization(pool.starmap)\n",
    "\n",
    "# define the problem by passing the starmap interface of the thread pool\n",
    "problem = MyProblem(elementwise_runner=runner)\n",
    "\n",
    "res = minimize(problem, GA(), termination=(\"n_gen\", 200), seed=1)\n",
    "print('Threads:', res.exec_time)\n",
    "\n",
    "pool.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "**Note:** Here clearly the overhead of serializing and transfer the data are visible."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Dask"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "More advanced is to distribute the evaluation function to a couple of workers. There exists a couple of frameworks that support the distribution of code. For our framework, we recommend using [Dask](https://dask.org).\n",
    "\n",
    "Documentation to setup the cluster is available [here](https://docs.dask.org/en/latest/setup/cli.html). You first start a scheduler somewhere and then connect workers to it. Then, a client object connects to the scheduler and distributes the jobs automatically for you."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "from pymoo.algorithms.soo.nonconvex.ga import GA\n",
    "from pymoo.optimize import minimize\n",
    "from pymoo.core.problem import DaskParallelization\n",
    "\n",
    "from dask.distributed import Client\n",
    "client = Client()\n",
    "client.restart()\n",
    "print(\"DASK STARTED\")\n",
    "\n",
    "# initialize the thread pool and create the runner\n",
    "runner = DaskParallelization(client)\n",
    "\n",
    "# define the problem by passing the starmap interface of the thread pool\n",
    "problem = MyProblem(elementwise_runner=runner)\n",
    "\n",
    "res = minimize(problem, GA(), termination=(\"n_gen\", 200), seed=1)\n",
    "print('Threads:', res.exec_time)\n",
    "\n",
    "client.close()\n",
    "print(\"DASK SHUTDOWN\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "**Note:** Here, the overhead of transferring data to the workers of Dask is dominating. However, if your problem is computationally more expensive, this shall not be the case anymore."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Custom Parallelization"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "If you need more control over the parallelization process, we like to provide an example of fully customizable parallelization. The `_evaluate` function gets the whole set of solutions to be evaluated because, by default, `elementwise_evaluation` is disabled."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "### Threads"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "Thus, a thread pool can be initialized in the constructor of the `Problem` class and then be used to speed up the evaluation.\n",
    "The code below basically does what internally happens using the `starmap` interface of *pymoo* directly (with an inline function definition and without some overhead, this is why it is slightly faster)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "from pymoo.core.problem import Problem\n",
    "\n",
    "pool = ThreadPool(8)\n",
    "\n",
    "class MyProblem(Problem):\n",
    "\n",
    "    def __init__(self, **kwargs):\n",
    "        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5, **kwargs)\n",
    "        \n",
    "    def _evaluate(self, X, out, *args, **kwargs):\n",
    "        \n",
    "        # define the function\n",
    "        def my_eval(x):\n",
    "            return (x ** 2).sum()\n",
    "            \n",
    "        # prepare the parameters for the pool\n",
    "        params = [[X[k]] for k in range(len(X))]\n",
    "\n",
    "        # calculate the function values in a parallelized manner and wait until done\n",
    "        F = pool.starmap(my_eval, params)\n",
    "        \n",
    "        # store the function values and return them.\n",
    "        out[\"F\"] = np.array(F)\n",
    "        \n",
    "problem = MyProblem()       "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "res = minimize(problem, GA(), termination=(\"n_gen\", 200), seed=1)\n",
    "print('Threads:', res.exec_time)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "pool.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "### Dask"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "from dask.distributed import Client\n",
    "\n",
    "from pymoo.core.problem import Problem\n",
    "from pymoo.optimize import minimize\n",
    "\n",
    "client = Client(processes=False)\n",
    "\n",
    "class MyProblem(Problem):\n",
    "\n",
    "    def __init__(self, *args, **kwargs):\n",
    "        super().__init__(n_var=10, n_obj=1, n_ieq_constr=0, xl=-5, xu=5,\n",
    "                         elementwise_evaluation=False, *args, **kwargs)\n",
    "\n",
    "    def _evaluate(self, X, out, *args, **kwargs):\n",
    "        def fun(x):\n",
    "            return np.sum(x ** 2)\n",
    "\n",
    "        jobs = [client.submit(fun, x) for x in X]\n",
    "        out[\"F\"] = np.row_stack([job.result() for job in jobs])\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "problem = MyProblem()\n",
    "\n",
    "res = minimize(problem, GA(), termination=(\"n_gen\", 200), seed=1)\n",
    "print('Dask:', res.exec_time)\n",
    "\n",
    "client.close()"
   ]
  }
 ],
 "metadata": {},
 "nbformat": 4,
 "nbformat_minor": 4
}
